<?php

namespace App\Process;

use App\Model\FormidsModel;
use App\Model\RecordsModel;
use App\Model\TmplmsgsModel;
use App\Utility\MiniProgram;
use App\Utility\RedisTools;
use Carbon\Carbon;
use EasySwoole\EasySwoole\Logger;
use EasySwoole\EasySwoole\Swoole\Process\AbstractProcess;
use Swoole\Process;

class MessagePush extends AbstractProcess
{
    private $isRun = false;
    public function run(Process $process){
        /*
         * 举例，消费redis中的队列数据
         * 定时500ms检测有没有任务，有的话就while死循环执行
         */
        $this->addTick(100,function (){
            if(!$this->isRun){
                $this->isRun = true;
                for ($i = 1; $i <= 2; $i++) {
                    $queue = new RedisTools('message-push');
                    \Co::create(function () use ($queue) {
                        while (true) {
                            try {
                                //lPop 乱序 rPop先进先出循序
                                $task = $queue->rPop();
                                if ($task) {
                                    $msgModel = new TmplmsgsModel();
                                    $msg = $msgModel->where('template_id', $task['template_id'])->first();
                                    //判断是否存在关键词放大处理
                                    $emphasis_keyword = '';
                                    if ($msg['emphasis']) {
                                        //存在，拼接返回字符串
                                        $emphasis_keyword = $msg['emphasis'] . ".DATA"; //将keyword放大
                                    }

                                    $wechat = new MiniProgram();
                                    $sendData = [
                                        'touser' => $task['openid'],
                                        'template_id' => $msg['template_id'],
                                        'page' => $msg['page'],
                                        'form_id' => $task['formid'],
                                        'data' => $msg['keywords'][0],
                                        'emphasis_keyword' => $emphasis_keyword,
                                    ];
                                    $recordModel = new RecordsModel();
                                    $recordModel->update(['id'=>$task['record_id'],'end_at'=>Carbon::now()]);
                                    $result = $wechat->sendTemplateMessage($sendData);
                                    $formModel = new FormidsModel();
                                    if ($result['errcode'] == 0) {
                                        //删除该条formid信息
                                        $formModel->destroy($task['id']);
                                        $recordModel->inc($task['record_id'], 'success_number');
                                    } else {
                                        if(($result['errcode'] == 41028) || ($result['errcode'] == 41029)){
                                            $formModel->destroy($task['id']);
                                        }
                                        $recordModel->inc($task['record_id'], 'faild_number');
                                    }
                                } else {
                                    break;
                                }
                                unset($task);
                            } catch (\Throwable $throwable) {
                                break;
                            }
                        }
                        $this->isRun = false;
                    });
                    unset($queue);
                }
            }
        });
    }

    public function onShutDown()
    {
        // TODO: Implement onShutDown() method.
    }

    public function onReceive(string $str, ...$args)
    {
        // TODO: Implement onReceive() method.
    }
}
